/**
 * @file 
 *
 * Benchmark for latency
 */

#include <filesystem>
#include <vector>
#include <unordered_map>
#include <map>
#include <unordered_set>
#include <random>
#include <fstream>
#include <sstream>
#include <memory>
#include <thread>
#include <atomic>
#include <chrono>
using namespace std::chrono_literals;

#include "common/boost_log_helper.hpp"
#include <boost/program_options.hpp>

#include "defaults.hpp"
#include "optim.hpp"
#include "ycsb_parser.hpp"
#include "client.hpp"
#include "perf/client_perf.hpp"
#include "../tweaks.hpp"

using namespace std;


int main(const int argc, const char **argv)
{
    const filesystem::path src_dir =
        filesystem::absolute(argv[0]).parent_path().parent_path().parent_path();
    filesystem::path config_path;
    filesystem::path ycsb_load_path;
    filesystem::path ycsb_run_path;
    string log_level;
    unsigned client_id;
    unsigned max_workload_threads;  // 0 for no limit
    unsigned bench_duration_sec;
    filesystem::path record_latency_basedir; // if empty don't record
    {
        namespace po = boost::program_options;
        po::options_description desc;
        desc.add_options()
            ("config,C", po::value(&config_path),
                "Configuration file, if not given, will search for "
                "/etc/gestalt/gestalt.conf, ./gestalt.conf, "
                "./etc/gestalt/gestalt.conf, whichever comes first.")
            ("log,L", po::value(&log_level)->default_value("info"),
                "Logging level (Boost).")
            ("id,i", po::value(&client_id)->default_value(114),
                "Specify client ID seed")
            ("max-workers,M", po::value(&max_workload_threads)->default_value(48),
                "Maximum number of parallel workload-generating threads, 0 for no limit")
            ("duration,D", po::value(&bench_duration_sec)->default_value(10),
                "Benchmark duration in seconds.")
            ("record-latency,R", po::value(&record_latency_basedir),
                "Output directory of recorded per-operation latency, will not "
                "record if not given.")
            ("ycsb-load", po::value(&ycsb_load_path)
                    ->default_value(src_dir/"build"/"runtime"/"load.ycsb"),
                "YCSB load output")
            ("ycsb-run", po::value(&ycsb_run_path)
                    ->default_value(src_dir/"build"/"runtime"/"run.ycsb"),
                "YCSB run output")
            ;
        po::variables_map vm;
        po::store(po::parse_command_line(argc, argv, desc), vm);
        po::notify(vm);
    }

    set_boost_log_level(log_level);

    if (config_path.empty()) {
        for (auto &p : gestalt::defaults::config_paths) {
            if (!filesystem::is_regular_file(p))
                continue;
            config_path = p;
            break;
        }
    }
    if (!filesystem::is_regular_file(config_path)) {
        BOOST_LOG_TRIVIAL(fatal) << "Cannot find configuration file";
        exit(EXIT_FAILURE);
    }

    const bool record_latency = !record_latency_basedir.empty();
    if (record_latency) {
        if (filesystem::exists(record_latency_basedir)
                && !filesystem::is_directory(record_latency_basedir)) {
            ostringstream what;
            what << record_latency_basedir << " exists but is not a directory";
            throw std::runtime_error(what.str());
        }
        /* clean-up dump directory, directory sturcture to be generated with
            fs::create_directories() */
        filesystem::remove_all(record_latency_basedir);
    }

    /* prepare YCSB data */
    smdsbz::ycsb_parser::trace ycsb_load, ycsb_run;
    {
        namespace yp = smdsbz::ycsb_parser;
        ycsb_load.reserve(1e4);
        yp::parse(ycsb_load_path, ycsb_load);
        ycsb_run.reserve(1e5);
        yp::parse(ycsb_run_path, ycsb_run);
    }
    BOOST_LOG_TRIVIAL(info) << "YCSB workload loaded";


    /* setup client */

    gestalt::Client client(config_path, client_id);
    BOOST_LOG_TRIVIAL(info) << "client successfully setup";

    /* load, and heat up client locator cache */
    /** @note insert collisions will be ignored */
    size_t successful_insertions = 0;
    {
        BOOST_LOG_TRIVIAL(info) << "Loading workload into Gestalt ...";
        for (const auto &d : ycsb_load) {
            uint8_t buf[4_K];
            /* HACK: we don't fill with actual data!
                but it does not affect performance, so it is actually okay not
                to bother, just do something and yisi-yisi :`) */
            std::strcpy(reinterpret_cast<char*>(buf), d.okey.c_str());
            int r = client.put(d.okey.c_str(), buf, sizeof(buf));
            if (!r) {
                successful_insertions++;
                continue;
            }
            if (r == -EDQUOT) {
                BOOST_LOG_TRIVIAL(trace) << "failed inserting key " << d.okey
                    << ", ignored";
                continue;
            }
            errno = -r;
            boost_log_errno_throw(Client::put);
        }
        BOOST_LOG_TRIVIAL(info) << "Finished loading workload, loaded "
            << successful_insertions << " / " << ycsb_load.size()
            << " (" << 100. * successful_insertions / ycsb_load.size() << "%)";
    }

    /* run, multi-threaded for bandwidth, single-threaded falls back to latency test */

    /* generate scrambled YCSB run trace for each thread, minimizing CPU cache
        miss impact introduced when copying user blob data to I/O request body */
    vector<unsigned> thread_nr_to_test{1, 2, 4, 8, 16, 32, 48, 64}; // must be in asc order
    /* apply worker thread limiter */
    while (max_workload_threads && thread_nr_to_test.back() > max_workload_threads)
        thread_nr_to_test.pop_back();
    BOOST_LOG_TRIVIAL(info) << "Generating trace for each thread ...";
    vector<decltype(ycsb_run)> thread_run(thread_nr_to_test.back());
    {
        std::random_device rd;
        std::default_random_engine re(rd());
        std::uniform_int_distribution<unsigned> dist(0, ycsb_run.size() - 1);
        for (auto &tt : thread_run) {
            tt.reserve(1e5);
            for (unsigned i = 0; i < ycsb_run.size(); i++)
                tt.push_back(ycsb_run.at(dist(re)));
        }
    }
    BOOST_LOG_TRIVIAL(info) << "Thread-specific trace generated";

    volatile bool thread_start_flag, thread_end_flag;
    std::atomic<unsigned> thread_ready_count;
    std::atomic<unsigned long long> total_completes_ops;
    // std::atomic<unsigned long long> total_retries;
    vector<unique_ptr<gestalt::perf::ClientLatencyRecorder>> thread_perf_lat;
    const auto reset_perf = [&] (unsigned thread_nr)
    {
        thread_perf_lat.resize(thread_nr);
        for (auto &pr : thread_perf_lat)
            pr.reset(new gestalt::perf::ClientLatencyRecorder);
    };
    const auto start_perf_op = [&] (
        gestalt::perf::ClientLatencyRecorder &lat_rec,
        const std::remove_cvref_t<decltype(ycsb_run.front())> &op)
    {
        using Op = decltype(op.op);
        switch (op.op) {
        case Op::READ: {
            lat_rec.read_tick();
            break;
        }
        case Op::INSERT: case Op::UPDATE: {
            lat_rec.write_tick();
            break;
        }
        default:
            throw std::runtime_error("unknown op type");
        }
    };
    const auto end_perf_op = [&] (
        gestalt::perf::ClientLatencyRecorder &lat_rec,
        const std::remove_cvref_t<decltype(ycsb_run.front())> &op)
    {
        using Op = decltype(op.op);
        switch (op.op) {
        case Op::READ: {
            lat_rec.read_tock();
            break;
        }
        case Op::INSERT: case Op::UPDATE: {
            lat_rec.write_tock();
            break;
        }
        default:
            throw std::runtime_error("unknown op type");
        }
    };

    const auto thread_test_fn = [&] (const unsigned thread_id) {
        const auto perf_lat_rec = record_latency ? thread_perf_lat.at(thread_id).get() : nullptr;
        gestalt::Client client(config_path, client_id + 200 + thread_id);
        /* DEBUG: causing RNIC to throw bad work request more frequently when
            thread count is high, don't know why.
            Not heating up cache introduces less than 0.1us increase to final
            average latency metric (spread among 1e6 ops), don't bother. */
        #if 0
        /* heat up client locator cache */
        for (const auto &d : ycsb_load) {
            if (int r = client.get(d.okey.c_str()); r) {
                [[unlikely]] if (r == -EINVAL)
                    continue;
                BOOST_LOG_TRIVIAL(error) << "failed to read " << d.okey
                    << " : " << std::strerror(-r);
                errno = -r;
                boost_log_errno_throw(Client::get locator-cache);
            }
        }
        std::this_thread::sleep_for(2s);
        #endif

        /* requests to invalid entries will not travel the same I/O path, thus
            polluting benchmark result, we should skip them */
        unordered_set<std::string> invalid_entries;

        thread_ready_count++;
        while (!thread_start_flag)
            [[unlikely]] ;

        const auto &thread_run_workload = thread_run[thread_id];
        const auto thread_run_size = thread_run_workload.size();
        unsigned long long completed_ops = 0;
        for (size_t i = 0; !thread_end_flag; i = (i + 1) % thread_run_size) {
            const auto &d = thread_run_workload[i];
            using Op = decltype(d.op);
            /* skip invalid entry */
            [[likely]] if constexpr (gestalt_bench::only_valid_data) {
                if (invalid_entries.count(d.okey))
                    [[unlikely]] continue;
            }
            if (perf_lat_rec)
                start_perf_op(*perf_lat_rec, d);
            int r = 0;
            bool retry;
            do {
                retry = false;
                switch (d.op) {
                case Op::READ: {
                    if (r = client.get(d.okey.c_str()); r) {
                        /* key temporarily locked */
                        [[unlikely]] if (r == -ECOMM) {
                            [[likely]] retry = true;
                            // total_retries++;
                            break;
                        }
                        if (r == -EAGAIN) {
                            [[likely]] if constexpr (!gestalt::optimization::allow_read_transient) {
                                retry = true;
                                // total_retries++;
                            }
                            break;
                        }
                        /* key not inserted */
                        if (r == -EINVAL)
                            [[likely]] break;
                        BOOST_LOG_TRIVIAL(error) << "failed to read " << d.okey
                            << " : " << std::strerror(-r);
                        errno = -r;
                        boost_log_errno_throw(Client::get);
                    }
                    break;
                }
                case Op::UPDATE: {
                    if constexpr (!gestalt::optimization::use_gestalt_bufferlist) {
                        uint8_t buf[4_K];
                        std::strcpy(reinterpret_cast<char*>(buf), d.okey.c_str());
                        r = client.put(d.okey.c_str(), buf, sizeof(buf));
                    }
                    else {
                        auto &bl = client.write_op->buf;
                        auto &slot = bl.data()[0];
                        slot.meta.set_key(d.okey);
                        std::strcpy(reinterpret_cast<char*>(slot.data.get()), d.okey.c_str());
                        bl.reformat(4_K);
                        r = client.put();
                    }
                    if (r) {
                        /* key temporarily locked */
                        [[unlikely]] if (r == -EBUSY) {
                            [[likely]] retry = true;
                            // total_retries++;
                            break;
                        }
                        /* key not inserted */
                        if (r == -EDQUOT)
                            [[likely]] break;
                        BOOST_LOG_TRIVIAL(error) << "failed to update " << d.okey
                            << " : " << std::strerror(-r);
                        errno = -r;
                        boost_log_errno_throw(Client::put);
                    }
                    break;
                }
                default:
                    throw std::runtime_error("unexpected run op");
                }
            } while (retry);
            /* record invalid entry, and exclude them from metrics */
            if constexpr (gestalt_bench::only_valid_data) {
                if (r) {
                    [[unlikely]] invalid_entries.insert(d.okey);
                    continue;
                }
            }
            completed_ops++;
            if (perf_lat_rec)
                end_perf_op(*perf_lat_rec, d);
        }

        total_completes_ops += completed_ops;
    };

    const std::chrono::seconds test_duration{bench_duration_sec};
    BOOST_LOG_TRIVIAL(info) << "Start benchmarking, each set will run for "
        << test_duration.count() << "s";
    // thread num -> total completed ops
    map<unsigned, unsigned long long> thread_test_metrics;
    for (const auto &tnr : thread_nr_to_test) {
        BOOST_LOG_TRIVIAL(info) << "Running test for " << tnr << "-threads";

        thread_start_flag = false;
        thread_end_flag = false;
        thread_ready_count = 0;
        total_completes_ops = 0;
        // total_retries = 0;
        if (record_latency)
            reset_perf(tnr);

        vector<std::jthread> pool;
        for (unsigned i = 0; i < tnr; i++)
            pool.push_back(std::jthread(thread_test_fn, i));
        while (thread_ready_count != tnr) ;

        BOOST_LOG_TRIVIAL(info) << "Starting test ...";
        const auto start = std::chrono::system_clock::now();
        thread_start_flag = true;
        const auto end = start + test_duration;
        while (std::chrono::system_clock::now() < end)
            [[unlikely]] ;
        thread_end_flag = true;

        /* wait for all threads to update metrics and end */
        pool.clear();
        BOOST_LOG_TRIVIAL(info) << "Finished test for " << tnr << "-threads";
        // BOOST_LOG_TRIVIAL(info) << "total retires " << total_retries.load();

        thread_test_metrics[tnr] = total_completes_ops;

        if (record_latency) {
            const filesystem::path run_dir =
                record_latency_basedir / (std::to_string(tnr) + "thrd");
            filesystem::create_directories(run_dir);
            for (unsigned i = 0; i < tnr; i++) {
                thread_perf_lat.at(i)->dump(
                    run_dir / ("thrd"s + std::to_string(i) + "_read.csv"),
                    run_dir / ("thrd"s + std::to_string(i) + "_write.csv"));
            }
            BOOST_LOG_TRIVIAL(info) << "Dumped recorded latency to " << record_latency_basedir;
        }
    }


    BOOST_LOG_TRIVIAL(info) << "Benchmark finished, each set ran for "
        << test_duration.count() << "s";
    BOOST_LOG_TRIVIAL(info) << std::left << std::fixed
        << std::setw(8) << "thrd"
        << std::setw(16) << "avg lat (us)"
        << std::setw(16) << "Miops"
        << std::setw(16) << "bw (GiB/s)";
    for (const auto &[tnr, ops] : thread_test_metrics) {
        const double lat_us = 1e6 * test_duration.count() / ops * tnr;
        const double miops = 1. * ops / 1e6 / test_duration.count();
        const double bw_GiB = 1. * ops * 4_K / 1_G / test_duration.count();
        BOOST_LOG_TRIVIAL(info)
            << std::setw(8) << tnr
            << std::setw(16) << lat_us
            << std::setw(16) << miops
            << std::setw(16) << bw_GiB;
    }
    BOOST_LOG_TRIVIAL(info) << std::right;

    return EXIT_SUCCESS;
}
